 1.大数据高速计算引擎Spark(上)之Spark SQL中Spark SQL编程下的SQL语句
   
   总体而言：SparkSQL与HQL兼容；与HQL相比，SparkSQL更简洁。
   createTempView、createOrReplaceTempView、spark.sql("SQL")

package cn.lagou.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}

case class Info(id: String, tags: String)

object SQLDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName(this.getClass.getCanonicalName)
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("warn")
    import spark.implicits._

    // 准备数据
    val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
    val rdd: RDD[Info] = spark.sparkContext.makeRDD(arr)
      .map { line =>
        val fields: Array[String] = line.split("\\s+")
        Info(fields(0), fields(1))
      }

    // 用SQL处理
    val ds: Dataset[Info] = spark.createDataset(rdd)
    ds.createOrReplaceTempView("t1")

    // 用SQL处理 -hive
    spark.sql(
      """
        |select id, tag
        |  from t1
        |       lateral view explode(split(tags, ",")) t2 as tag
        |""".stripMargin
    ).show

    // SparkSQL
    spark.sql(
      """
        |select id, explode(split(tags, ",")) tag
        |  from t1
        |""".stripMargin
    ).show

    spark.close()
  }
}


 2.输入与输出
   
   SparkSQL内建支持的数据源包括：Parquet、JSON、CSV、Avro、Images、BinaryFiles
(Spark 3.0)。其中Parquet是默认的数据源。
// 内部使用
DataFrameReader.format(args).option("key", "value").schema(args).load()
// 开发API
SparkSession.read
   
   可用的Option选项参见：
https://spark.apache.org/docs/2.4.5/api/java/org/apache/spark/sql/DataFrameReader.html

val df1 = spark.read.format("parquet").load("data/users.parquet")
// Use Parquet; you can omit format("parquet") if you wish as
// it's the default
val df2 = spark.read.load("data/users.parquet")

// Use CSV
val df3 = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("data/people1.csv")

// Use JSON
val df4 = spark.read.format("json")
.load("data/emp.json")

// 内部使用
DataFrameWriter.format(args)
.option(args)
.bucketBy(args)
.partitionBy(args)
.save(path)

// 开发API
DataFrame.write


package cn.lagou.sparksql

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object InputOutputFileDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Demo1")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("warn")

    // parquet
    import spark._
    val df1: DataFrame = spark.read.load("data/users.parquet")
    df1.createOrReplaceTempView("t1")
    df1.show

    sql(
      """
        |create or replace temporary view users
        |using parquet
        |options (path "data/users.parquet")
        |""".stripMargin
    )
    sql(
      """
        |select * from users
        |""".stripMargin
    ).show

    // parquet文件
    df1.write.format("parquet")
      .mode("overwrite")
      .save("data/parquet")

    // json文件
    val df3: DataFrame = spark.read.format("json").load("data/emp.json")
    df3.show

    sql(
      """
        |create or replace temporary view emp
        | using json
        |options (path "data/emp.json")
        |""".stripMargin)
    sql(
      """
        |select * from emp
        |""".stripMargin).write
      .format("json")
      .mode("overwrite")
      .save("data/json")

    // csv文件
    val df2 = spark.read.format("csv")
        .option("header", "true")
        .option("inferschema", "true")
        .load("data/people1.csv")
    df2.show()

    sql(
      """
        |create or replace temporary view people
        | using csv
        |options (path "data/people1.csv",
        |         header "true",
        |         inferschema "true")
        |""".stripMargin)

    sql("select * from people").write
        .format("csv")
        .mode("overwrite")
        .save("data/csv")

    // SparkSQL还支持使用JDBC的方式连接到外部数据源：
    val jdbcDF: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false")
      .option("user", "hive")
      .option("password", "12345678")
      .option("diiver", "com.mysql.jdbc.Driver")
      .option("dbtable", "lagou_product_info")
      .load()
    jdbcDF.show
    jdbcDF.write.format("jdbc")
      .option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false&characterEncoding=utf8")
      .option("user", "hive")
      .option("password", "12345678")
      .option("diiver", "com.mysql.jdbc.Driver")
      .option("dbtable", "lagou_product_info_back")
        .mode(SaveMode.Append)
        .save()
    spark.close()
  }

}
   备注：如果有中文注意表的字符集，否则会有乱码
       SaveMode.ErrorIfExists（默认）。若表存在，则会直接报异常，数据不能存入
数据库 
       SaveMode.Append。若表存在，则追加在该表中；若该表不存在，则会先创建表,
再插入数据
       SaveMode.Overwrite。先将已有的表及其数据全都删除，再重新创建该表，最后
插入新的数据
       SaveMode.Ignore。若表不存在，则创建表并存入数据；若表存在，直接跳过数
据的存储,不会报错
   
   -- 创建表
   create table lagou_product_info_backup as
select * from lagou_product_info;
   -- 检查表的字符集
   show create table lagou_product_info_back;
   show create table lagou_product_info;
   -- 修改表的字符集
   alter table lagou_product_info_back convert to character set utf8;